package com.zwcl.common.mq;

import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.zwcl.common.core.config.RedisConfig;
import com.zwcl.common.core.redis.RedisLock;
import com.zwcl.common.core.redis.RedisLockLua;
import com.zwcl.common.core.utils.CollectionUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;

import java.util.ArrayList;
import java.util.List;

@Slf4j
@Async
public abstract class MqScheduleTaskTemplate {
    /**
     * 需要检查的topic名称
     */
    public List<String> topicList = new ArrayList<>();
    /**
     * 默认加锁的key值
     */
    public String lockKey;
    /**
     * 默认加锁30分钟
     */
    public long LOCK_TIME = RedisConfig.DEFAULT_LOCK_EXPIRED_TIME;
    private static String MQ_TASK_LOCK_PREFIX = "mq_task_lock_";


    @Autowired
    private RedisLockLua redisLock;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RocketMQTemplate template;

    public void failMessageRetry() {

        if(StringUtils.isBlank(lockKey)){
            throw new RuntimeException("please set lockKey value");
        }
        log.info("fail mq message retry start!");
        String value = IdWorker.getIdStr();
        boolean lock = redisLock.lock(MQ_TASK_LOCK_PREFIX + lockKey, value, LOCK_TIME, RedisConfig.DEFAULT_TRY_LOCK_TIMEOUT);
        if (!lock) {
            log.error("fail to get mqLockKey");
            return;
        }

        try {
            this.reTry();
        } catch (Exception e) {
            log.error("message retry error, errMsg:{}", e.getMessage(), e);
        } finally {
            redisLock.unlock(MQ_TASK_LOCK_PREFIX + lockKey, value);
        }
        log.info("fail mq message retry success!");
    }

    private void reTry() {
        for(String topic : topicList){
            List<Object> failMsgList = stringRedisTemplate.opsForHash().values(MqConstant.TOPIC_PREFIX +topic);
            log.info("fail mq message : " + JSON.toJSONString(failMsgList));
            if(!CollectionUtils.isEmpty(failMsgList)) {
                for(Object failMsg : failMsgList){
                    BasePayload basePayload = JSON.parseObject(failMsg.toString(), BasePayload.class);
                    SendResult sendResult = template.syncSend(topic, basePayload);
                    if(SendStatus.SEND_OK == sendResult.getSendStatus()){
                        stringRedisTemplate.opsForHash().delete(MqConstant.TOPIC_PREFIX +topic, basePayload.getBizId());
                    }
                }
            }
        }
    }

}